﻿using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading.Tasks;

using iTool.Cloud.Database.iToolService;
using iTool.Cloud.Database.Options;
using iTool.Cloud.Database.ServiceProvider;
using iTool.ClusterComponent;
using iTool.Common;
using iTool.SQL.AIHandle;

using Microsoft.Data.Sqlite;

using Orleans;

namespace iTool.Cloud.Database.LocalServiceProvider
{
    public class AISQLProvider : iSqlProvider, IDisposable
    {
        private readonly Subject<BatchItem<AnalysisExecuteItemOptions>> subjectOfExecuteItems;
        private readonly IClusterService clusterService;

        public AISQLProvider()
        {
            clusterService = iBox.GetService<IClusterService>("IClusterService");
            subjectOfExecuteItems = new Subject<BatchItem<AnalysisExecuteItemOptions>>();
            // 10毫秒 或者 2000 条
            subjectOfExecuteItems.Buffer(TimeSpan.FromMilliseconds(10), 200)
                .Where(x => x.Count > 0)
                .Select(list => Observable.FromAsync(() => BatchExecuteSqlsAsync(list)))
                .Concat()
                .Subscribe();
        }

        private async Task BatchExecuteSqlsAsync(IList<BatchItem<AnalysisExecuteItemOptions>> executeItems)
        {
            
            Dictionary<string, List<AnalysisExecuteItemOptions>> services = new Dictionary<string, List<AnalysisExecuteItemOptions>>();
            Dictionary<string, List<TaskCompletionSource<Task>>> tasks = new Dictionary<string, List<TaskCompletionSource<Task>>>();

            foreach (var item in executeItems)
            {
                if (services.TryGetValue(item.Body.TableName,out var list))
                {
                    list.Add(item.Body);
                }
                else
                {
                    services.Add(item.Body.TableName, new List<AnalysisExecuteItemOptions> { item.Body });
                }

                if (tasks.TryGetValue(item.Body.TableName,out var taskList))
                {
                    taskList.Add(item.TaskSource);
                }
                else
                {
                    tasks.Add(item.Body.TableName, new List<TaskCompletionSource<Task>> { item.TaskSource });
                }
            }

            List<(string key, Task task)> returnTasks = new List<(string key, Task task)>();
            foreach (var item in services)
            {
                //var sources = tasks[item.Key];
                //try
                //{
                    iTableExecuteService executeService = this.clusterService.GetService<iTableExecuteService>(item.Key);
                    returnTasks.Add((item.Key, executeService.BatchExecuteNonQueryAsync(item.Value)));
                //    foreach (var source in sources)
                //    {
                //        if (!source.Task.IsCompleted)
                //        {
                //            source.TrySetResult(Task.CompletedTask);
                //        }
                //    }
                //}
                //catch (Exception ex)
                //{
                //    foreach (var source in sources)
                //    {
                //        if (!source.Task.IsCompleted)
                //        {
                //            source.TrySetException(ex);
                //        }
                //    }
                //}
            }

            await Task.WhenAll(returnTasks.Select(item => item.task));

            foreach (var item in returnTasks)
            {
                var sources = tasks[item.key];

                if (item.task.IsCompletedSuccessfully)
                {
                    foreach (var source in sources)
                    {
                        if (!source.Task.IsCompleted)
                        {
                            source.TrySetResult(Task.CompletedTask);
                        }
                    }
                }
                else
                {
                    foreach (var source in sources)
                    {
                        if (!source.Task.IsCompleted)
                        {
                            source.TrySetException(source.Task.Exception);
                        }
                    }
                }
            }

        }

        public async ValueTask<(string data, int total, string token)> ExecuteReaderAsync(string sql, params SqliteParameter[] parameters)
        {
            var result = this.GetAnalysisResult(sql);
            if (result.FormatSqlResult.action != SQL.AIHandle.SQLActionOptions.SELECT)
            {
                throw new Exception("No query please use ExecuteNonQueryAsync");
            }
            var query = new AnalysisExecuteItemOptions(result, parameters);
            iTableReaderService readerService =  this.clusterService.GetService<iTableReaderService>(result.FormatSqlResult.tableName);
            return await readerService.ExecuteReaderAsync(query);
        }

        public async ValueTask<object> ExecuteScalarAsync(string sql, params SqliteParameter[] parameters)
        {
            var result = this.GetAnalysisResult(sql);
            if (result.FormatSqlResult.action != SQL.AIHandle.SQLActionOptions.SELECT)
            {
                throw new Exception("No query please use ExecuteNonQueryAsync");
            }
            var query = new AnalysisExecuteItemOptions(result, parameters);
            iTableReaderService readerService = this.clusterService.GetService<iTableReaderService>(result.FormatSqlResult.tableName);
            return await readerService.ExecuteScalarAsync(query);
        }

        public async ValueTask<long> ExecuteNonQueryAsync(string sql, params SqliteParameter[] parameters)
        {
            var result = this.GetAnalysisResult(sql);
            if (result.FormatSqlResult.action == SQL.AIHandle.SQLActionOptions.SELECT)
            {
                throw new Exception("No query please use ExecuteReaderAsync or ExecuteScalarAsync");
            }
            var query = new AnalysisExecuteItemOptions(result, parameters);
            iTableExecuteService executeService = this.clusterService.GetService<iTableExecuteService>(result.FormatSqlResult.tableName);
            return await executeService.ExecuteNonQueryAsync(query);
        }

        public async ValueTask ExecuteNonQueryNoResultAsync(string sql, params SqliteParameter[] parameters)
        {
            var result = this.GetAnalysisResult(sql);
            if (result.FormatSqlResult.action == SQL.AIHandle.SQLActionOptions.SELECT)
            {
                throw new Exception("No query please use ExecuteReaderAsync or ExecuteScalarAsync");
            }
            var query = new AnalysisExecuteItemOptions(result, parameters);
            var batchItem = new BatchItem<AnalysisExecuteItemOptions>(query);
            this.subjectOfExecuteItems.OnNext(batchItem);
            await batchItem.TaskSource.Task;
        }

        public async ValueTask BatchExecuteNonQueryAsync(List<ExecuteItemOptions> executeItems)
        {
            var source = new TaskCompletionSource<Task>();
            foreach (var item in executeItems)
            {
                var result = this.GetAnalysisResult(item.Sql);
                if (result.FormatSqlResult.action == SQL.AIHandle.SQLActionOptions.SELECT)
                {
                    throw new Exception("No query please use ExecuteReaderAsync or ExecuteScalarAsync");
                }
                var query = new AnalysisExecuteItemOptions(result, item.Parameters);
                var batchItem = new BatchItem<AnalysisExecuteItemOptions>(query, source);
                this.subjectOfExecuteItems.OnNext(batchItem);
            }
            await source.Task;
        }

        public Task<(List<T> data, int total, string token)> ExecuteReaderAsync<T>(string sql, params SqliteParameter[] parameters) where T : class, new()
        {
            var result = this.GetAnalysisResult(sql);
            if (result.FormatSqlResult.action != SQL.AIHandle.SQLActionOptions.SELECT)
            {
                throw new Exception("No query please use ExecuteNonQueryAsync");
            }
            var query = new AnalysisExecuteItemOptions(result, parameters);
            iTableReaderService<T> readerService = this.clusterService.GetService<iTableReaderService<T>>(result.FormatSqlResult.tableName);
            return readerService.ExecuteReaderAsync(query);
        }

        /// <summary>
        /// 可以确保批处理统一成功或者失败
        /// 最高事物级别，会阻止其它动作执行
        /// </summary>
        public async ValueTask ExecuteTransactionOfLockTableAsync(List<ExecuteItemOptions> executeItems)
        {
            // table => analysiss
            Dictionary<string, List<AnalysisExecuteItemOptions>> tableAnalysisKeyValuePair = new Dictionary<string, List<AnalysisExecuteItemOptions>>();
            // address => tables
            Dictionary<string, List<string>> serviceKeyValuePairs = new Dictionary<string, List<string>>();
            // table => service
            Dictionary<string, iTableExecuteService> tableServiceKeyValuePairs = new Dictionary<string, iTableExecuteService>();

            await this.AnalysisExecuteTransactionAsync(executeItems, tableAnalysisKeyValuePair, serviceKeyValuePairs, tableServiceKeyValuePairs);

            // 获取对应 服务执行权
            var orderlyWorkScopes = serviceKeyValuePairs.Keys.Select(item => new OrderlyWorkScopeProvider(item));

            // 申请唯一执行权限
            var scopes = await Task.WhenAll(orderlyWorkScopes.Select(scope => scope.CreateWorkUnitScopeAsync()));

            // 执行业务代码
            await this.ExecuteTransactionAsync(executeItems, tableAnalysisKeyValuePair, serviceKeyValuePairs, tableServiceKeyValuePairs, true);

            // 释放锁
            foreach (var scope in scopes)
            {
                await scope.DisposeAsync();
            }
        }

        /// <summary>
        /// 可以确保批处理统一成功或者失败
        /// 但是存在并发执行，幻读幻写
        /// </summary>
        public async ValueTask ExecuteTransactionAsync(List<ExecuteItemOptions> executeItems)
        {
            // table => analysiss
            Dictionary<string, List<AnalysisExecuteItemOptions>> tableAnalysisKeyValuePair = new Dictionary<string, List<AnalysisExecuteItemOptions>>();
            // address => tables
            Dictionary<string, List<string>> serviceKeyValuePairs = new Dictionary<string, List<string>>();
            // table => service
            Dictionary<string, iTableExecuteService> tableServiceKeyValuePairs = new Dictionary<string, iTableExecuteService>();

            await this.AnalysisExecuteTransactionAsync(executeItems, tableAnalysisKeyValuePair, serviceKeyValuePairs, tableServiceKeyValuePairs);

            await this.ExecuteTransactionAsync(executeItems, tableAnalysisKeyValuePair, serviceKeyValuePairs, tableServiceKeyValuePairs, false);
        }

        private async ValueTask AnalysisExecuteTransactionAsync(List<ExecuteItemOptions> executeItems,
            Dictionary<string, List<AnalysisExecuteItemOptions>> tableAnalysisKeyValuePair,
            Dictionary<string, List<string>> serviceKeyValuePairs,
            Dictionary<string, iTableExecuteService> tableServiceKeyValuePairs
            ) 
        {
            
            foreach (var item in executeItems)
            {
                var analysisResult = this.GetAnalysisResult(item.Sql);
                if (analysisResult.FormatSqlResult.action == SQL.AIHandle.SQLActionOptions.SELECT)
                {
                    continue;
                }

                var query = new AnalysisExecuteItemOptions(analysisResult, item.Parameters);

                if (!tableAnalysisKeyValuePair.TryGetValue(analysisResult.FormatSqlResult.tableName, out var list))
                {
                    list = new List<AnalysisExecuteItemOptions>();
                    tableAnalysisKeyValuePair.Add(analysisResult.FormatSqlResult.tableName, list);
                }

                list.Add(query);
            }

            // TUDO    检查激活的Service
            foreach (var item in tableAnalysisKeyValuePair)
            {
                if (!tableServiceKeyValuePairs.ContainsKey(item.Key))
                {
                    tableServiceKeyValuePairs[item.Key] = this.clusterService.GetService<iTableExecuteService>(item.Key);
                }

                string service = await tableServiceKeyValuePairs[item.Key].GetRegisterServiceAsync();
                if (serviceKeyValuePairs.TryGetValue(service, out var list))
                {
                    list.Add(item.Key);
                }
                else
                {
                    serviceKeyValuePairs.Add(service, new List<string> { item.Key });
                }
            }
        }

        private async ValueTask ExecuteTransactionAsync(List<ExecuteItemOptions> executeItems,
            Dictionary<string, List<AnalysisExecuteItemOptions>> tableAnalysisKeyValuePair,
            Dictionary<string, List<string>> serviceKeyValuePairs,
            Dictionary<string, iTableExecuteService> tableServiceKeyValuePairs,
            bool isLockTableDB)
        {

            // TUDO    Create tranid
            var transactionKeyService = this.clusterService.GetService<iTransactionKeyService>();
            long tranId = await transactionKeyService.GetTransactiondKeyAsync();

            // 获取所有活动节点
            var result = await this.clusterService.GetManagementer().GetHosts(true);
            //var noders = result.Select(item => clusterService.GetServiceByClusterNoder<iTransactionService>(item.Key.ToParsableString()));

            // table 所在的节点
            var tableNoders = serviceKeyValuePairs.Select(item => clusterService.GetServiceByClusterNoder<iTransactionService>(item.Key));

            try
            {
                // TUDO    Begin all by tableserver
                await Task.WhenAll(tableNoders.Select(item => item.BeginTransactionAsync(tranId, tableServiceKeyValuePairs.Keys.ToArray(), isLockTableDB)));

                // TUDO    Execute all by table
                await Task.WhenAll(tableAnalysisKeyValuePair.Select(item => tableServiceKeyValuePairs[item.Key].ExecuteTransactionAsync(item.Value, tranId)));
            }
            catch (Exception)
            {
                await Task.WhenAll(tableNoders.Select(item => item.RollbackAsync(tranId)));
                await Task.WhenAll(tableNoders.Select(item => item.ErrorTransactiondAsync(tranId)));
                throw;
            }

            try
            {
                // TUDO    Commit All by tableserver
                await Task.WhenAll(tableNoders.Select(item => item.CommitTransactionAsync(tranId)));

                // complated
                await this.TryComplateTransactiondAsync(tableNoders, tranId, 0);

                // add logger
                await Task.WhenAll(tableAnalysisKeyValuePair.Select(item => tableServiceKeyValuePairs[item.Key].ComplatedTransactionAsync()));
            }
            catch (Exception)
            {
                await this.TryComplateTransactiondAsync(tableNoders, tranId, 1);
                //await Task.WhenAll(tableNoders.Select(item => item.RollbackAsync()));
                // error
                await this.TryComplateTransactiondAsync(tableNoders, tranId, 2);
                //await Task.WhenAll(tableNoders.Select(item => item.ErrorTransactiondAsync(tranId)));
            }

            try
            {
                await Task.WhenAll(tableNoders.Select(item => item.ComplateTransactiondAsync(tranId)));
            }
            catch (Exception ex)
            {
                // 忽略失败， 因为已经写进去了
                Console.WriteLine(ex.Message);
            }
        }


        #region Analysis
        public AnalysisResult GetAnalysisResult(string sql)
        {
            AnalysisResult analysis = new AnalysisResult();
            var statement = SqlAnalysisFactory.CreateStatementAnalysisInstance(sql);
            analysis.TableStructures = this.GetStructure(statement);

            if (statement.Context.SQLAction == SQLActionOptions.INSERT)
            {
                var sqlChars = sql.ToCharArray();
                int one = 0, two = 0;
                for (int i = 0; i < sqlChars.Length; i++)
                {
                    if (sqlChars[i] == '(')
                    {
                        if (one > 0)
                        {
                            two = i;
                            break;
                        }
                        else
                        {
                            one = i;
                        }
                    }
                }
                sql = sql.Insert(two + 1, "@B_IDX,");
                sql = sql.Insert(one + 1, "_IDX,");
            }
            else if (statement.Context.SQLAction == SQLActionOptions.DELETE)
            {
                var sqlChars = sql.ToCharArray();
                int firstIndex = 0;
                string from = string.Empty;
                for (int i = 0; i < sqlChars.Length; i++)
                {
                    if (firstIndex > 0)
                    {
                        if (sqlChars[i] != ' ')
                        {
                            from = sql.Substring(i, 4);
                            if (string.Compare(from, "From", true) == 0)
                            {
                                if (sqlChars[i + 4] != ' ')
                                {
                                    // add from
                                    sql = sql.Insert(i - 1, "from ");
                                }
                            }
                            else
                            {
                                // add from
                                sql = sql.Insert(i - 1, " from ");
                            }
                        }
                        break;
                    }
                    if (sqlChars[i] == ' ')
                    {
                        firstIndex = i;
                    }
                }
            }

            analysis.CustomFunctions = statement.Context.CustomFunctions;
            analysis.SortByOptions = statement.Context.SortByOptions;
            if (analysis.SortByOptions?.Any() == true)
            {
                foreach (var item in analysis.SortByOptions)
                {
                    item.TableName = statement.Context.Tables[item.SubIndex].Value;
                }
            }

            if (statement.Context.SQLAction == SQLActionOptions.SELECT || statement.Context.SQLAction == SQLActionOptions.INSERT)
            {
                analysis.FormatSqlResult = new FormatSqlResult(sql, statement.Context.SQLAction, statement.Context.Tables[0].Value, statement.Context.WhereCase, new long[0], new List<string>(0));
            }
            else
            {
                analysis.FormatSqlResult = new FormatSqlResult(sql, statement.Context.SQLAction, statement.Context.Tables[0].Value, statement.Context.WhereCase, new long[0], statement.Context.SetFields);
            }
            if (statement.Context.Indexs?.Any() == true)
            {
                analysis.Indexs = new List<SearchIndexOptions>();
                foreach (var item in statement.Context.Indexs)
                {
                    analysis.Indexs.Add(new SearchIndexOptions(item.Sql, item.IsHasNot, item.IndexType, item.ChildIndexFns, item.Table,item.PreviouToken,item
                        .Page,item.Size));
                }
            }

            return analysis;
        }
        private Dictionary<string, LinkedList<string>> GetStructure(SqlAnalysisBase statement)
        {
            // 检查结构
            statement.Analysis();
            if (statement.Context.Errors != null)
                foreach (var item in statement.Context.Errors)
                {
                    throw new Exception(item);
                }

            var keyValuePairs = statement.GetTableStructure(out string error);

            if (!string.IsNullOrWhiteSpace(error))
            {
                throw new Exception(error);
            }

            return keyValuePairs;
        }

        public void Dispose()
        {
            if (!this.subjectOfExecuteItems.IsDisposed)
            {
                try
                {
                    this.subjectOfExecuteItems.OnCompleted();
                    this.subjectOfExecuteItems.Dispose();
                }
                catch
                {

                }
            }
            
        }
        #endregion

        private async Task TryComplateTransactiondAsync(IEnumerable<iTransactionService> tableNoders, long tranId, int action, int retry = 0)
        {
            retry++;
            try
            {
                switch (action)
                {
                    case 0:
                        await Task.WhenAll(tableNoders.Select(item => item.ComplateTransactiondAsync(tranId)));
                        break;
                    case 1:
                        await Task.WhenAll(tableNoders.Select(item => item.RollbackAsync(tranId)));
                        break;
                    case 2:
                        await Task.WhenAll(tableNoders.Select(item => item.ErrorTransactiondAsync(tranId)));
                        break;
                    default:
                        throw new Exception("no find action:" + action);
                }
            }
            catch (Exception ex)
            {
                if (retry > 8)
                {
                    throw ex;
                }
                await Task.Delay(retry * 100);
                await this.TryComplateTransactiondAsync(tableNoders, tranId, action, retry);
            }
        }
    }



}
